Skip to content

Commit

Permalink
refactor(server/v2): Update prepare & process proposal (#21237)
Browse files Browse the repository at this point in the history
(cherry picked from commit 95b8092)

# Conflicts:
#	server/v2/appmanager/appmanager.go
  • Loading branch information
hieuvubk authored and mergify[bot] committed Sep 4, 2024
1 parent 57c6f60 commit 7c7b090
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 51 deletions.
189 changes: 189 additions & 0 deletions server/v2/appmanager/appmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package appmanager

import (
"bytes"

Check failure on line 4 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / dependency-review

"bytes" imported and not used

Check failure on line 4 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / golangci-lint

"bytes" imported and not used
"context"
"encoding/json"

Check failure on line 6 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / dependency-review

"encoding/json" imported and not used

Check failure on line 6 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / golangci-lint

"encoding/json" imported and not used
"errors"
"fmt"

"cosmossdk.io/core/server"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
)

// Store defines the underlying storage behavior needed by AppManager.
type Store interface {
// StateLatest returns a readonly view over the latest
// committed state of the store. Alongside the version
// associated with it.
StateLatest() (uint64, corestore.ReaderMap, error)

// StateAt returns a readonly view over the provided
// state. Must error when the version does not exist.
StateAt(version uint64) (corestore.ReaderMap, error)
}

// AppManager is a coordinator for all things related to an application
type AppManager[T transaction.Tx] struct {
config Config

Check failure on line 29 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: Config

Check failure on line 29 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: Config

db Store

initGenesis InitGenesis

Check failure on line 33 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: InitGenesis

Check failure on line 33 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: InitGenesis
exportGenesis ExportGenesis

Check failure on line 34 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: ExportGenesis

Check failure on line 34 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: ExportGenesis

stf StateTransitionFunction[T]

Check failure on line 36 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: StateTransitionFunction

Check failure on line 36 in server/v2/appmanager/appmanager.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: StateTransitionFunction (typecheck)
}

// InitGenesis initializes the genesis state of the application.
func (a AppManager[T]) InitGenesis(
ctx context.Context,
blockRequest *server.BlockRequest[T],
initGenesisJSON []byte,
txDecoder transaction.Codec[T],
) (*server.BlockResponse, corestore.WriterMap, error) {
v, zeroState, err := a.db.StateLatest()
if err != nil {
return nil, nil, fmt.Errorf("unable to get latest state: %w", err)
}
if v != 0 { // TODO: genesis state may be > 0, we need to set version on store
return nil, nil, errors.New("cannot init genesis on non-zero state")
}

var genTxs []T
genesisState, err := a.stf.RunWithCtx(ctx, zeroState, func(ctx context.Context) error {
return a.initGenesis(ctx, bytes.NewBuffer(initGenesisJSON), func(jsonTx json.RawMessage) error {
genTx, err := txDecoder.DecodeJSON(jsonTx)
if err != nil {
return fmt.Errorf("failed to decode genesis transaction: %w", err)
}
genTxs = append(genTxs, genTx)
return nil
})
})
if err != nil {
return nil, nil, fmt.Errorf("failed to import genesis state: %w", err)
}
// run block
blockRequest.Txs = genTxs

blockResponse, blockZeroState, err := a.stf.DeliverBlock(ctx, blockRequest, genesisState)
if err != nil {
return blockResponse, nil, fmt.Errorf("failed to deliver block %d: %w", blockRequest.Height, err)
}

// after executing block 0, we extract the changes and apply them to the genesis state.
stateChanges, err := blockZeroState.GetStateChanges()
if err != nil {
return nil, nil, fmt.Errorf("failed to get block zero state changes: %w", err)
}

err = genesisState.ApplyStateChanges(stateChanges)
if err != nil {
return nil, nil, fmt.Errorf("failed to apply block zero state changes to genesis state: %w", err)
}

return blockResponse, genesisState, err
}

// ExportGenesis exports the genesis state of the application.
func (a AppManager[T]) ExportGenesis(ctx context.Context, version uint64) ([]byte, error) {
zeroState, err := a.db.StateAt(version)
if err != nil {
return nil, fmt.Errorf("unable to get latest state: %w", err)
}

bz := make([]byte, 0)
_, err = a.stf.RunWithCtx(ctx, zeroState, func(ctx context.Context) error {
if a.exportGenesis == nil {
return errors.New("export genesis function not set")
}

bz, err = a.exportGenesis(ctx, version)
if err != nil {
return fmt.Errorf("failed to export genesis state: %w", err)
}

return nil
})
if err != nil {
return nil, fmt.Errorf("failed to export genesis state: %w", err)
}

return bz, nil
}

func (a AppManager[T]) DeliverBlock(
ctx context.Context,
block *server.BlockRequest[T],
) (*server.BlockResponse, corestore.WriterMap, error) {
latestVersion, currentState, err := a.db.StateLatest()
if err != nil {
return nil, nil, fmt.Errorf("unable to create new state for height %d: %w", block.Height, err)
}

if latestVersion+1 != block.Height {
return nil, nil, fmt.Errorf("invalid DeliverBlock height wanted %d, got %d", latestVersion+1, block.Height)
}

blockResponse, newState, err := a.stf.DeliverBlock(ctx, block, currentState)
if err != nil {
return nil, nil, fmt.Errorf("block delivery failed: %w", err)
}

return blockResponse, newState, nil
}

// ValidateTx will validate the tx against the latest storage state. This means that
// only the stateful validation will be run, not the execution portion of the tx.
// If full execution is needed, Simulate must be used.
func (a AppManager[T]) ValidateTx(ctx context.Context, tx T) (server.TxResult, error) {
_, latestState, err := a.db.StateLatest()
if err != nil {
return server.TxResult{}, err
}
res := a.stf.ValidateTx(ctx, latestState, a.config.ValidateTxGasLimit, tx)
return res, res.Error
}

// Simulate runs validation and execution flow of a Tx.
func (a AppManager[T]) Simulate(ctx context.Context, tx T) (server.TxResult, corestore.WriterMap, error) {
_, state, err := a.db.StateLatest()
if err != nil {
return server.TxResult{}, nil, err
}
result, cs := a.stf.Simulate(ctx, state, a.config.SimulationGasLimit, tx) // TODO: check if this is done in the antehandler
return result, cs, nil
}

// Query queries the application at the provided version.
// CONTRACT: Version must always be provided, if 0, get latest
func (a AppManager[T]) Query(ctx context.Context, version uint64, request transaction.Msg) (transaction.Msg, error) {
// if version is provided attempt to do a height query.
if version != 0 {
queryState, err := a.db.StateAt(version)
if err != nil {
return nil, err
}
return a.stf.Query(ctx, queryState, a.config.QueryGasLimit, request)
}

// otherwise rely on latest available state.
_, queryState, err := a.db.StateLatest()
if err != nil {
return nil, err
}
return a.stf.Query(ctx, queryState, a.config.QueryGasLimit, request)
}

// QueryWithState executes a query with the provided state. This allows to process a query
// independently of the db state. For example, it can be used to process a query with temporary
// and uncommitted state
func (a AppManager[T]) QueryWithState(
ctx context.Context,
state corestore.ReaderMap,
request transaction.Msg,
) (transaction.Msg, error) {
return a.stf.Query(ctx, state, a.config.QueryGasLimit, request)
}
33 changes: 10 additions & 23 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,8 @@ func (c *Consensus[T]) PrepareProposal(
return nil, errors.New("PrepareProposal called with invalid height")
}

decodedTxs := make([]T, len(req.Txs))
for i, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}

decodedTxs[i] = decTx
if c.prepareProposalHandler == nil {
return nil, errors.New("no prepare proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -345,7 +336,7 @@ func (c *Consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})

txs, err := c.prepareProposalHandler(ciCtx, c.app, decodedTxs, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
return nil, err
}
Expand All @@ -366,16 +357,12 @@ func (c *Consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
decodedTxs := make([]T, len(req.Txs))
for _, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}
decodedTxs = append(decodedTxs, decTx)
if req.Height < 1 {
return nil, errors.New("ProcessProposal called with invalid height")
}

if c.processProposalHandler == nil {
return nil, errors.New("no process proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -385,7 +372,7 @@ func (c *Consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})

err := c.processProposalHandler(ciCtx, c.app, decodedTxs, req)
err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{
Expand Down
Loading

0 comments on commit 7c7b090

Please sign in to comment.