Skip to content

Commit

Permalink
Merge branch 'main' into julien/homehelper
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrbrt authored Jul 16, 2024
2 parents 24269e7 + 86ea861 commit 3eaf060
Show file tree
Hide file tree
Showing 26 changed files with 300 additions and 139 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
* (baseapp) [#20208](https://github.com/cosmos/cosmos-sdk/pull/20208) Skip running validateBasic for rechecking txs.
* (baseapp) [#20380](https://github.com/cosmos/cosmos-sdk/pull/20380) Enhanced OfferSnapshot documentation.
* (client) [#20771](https://github.com/cosmos/cosmos-sdk/pull/20771) Remove `ReadDefaultValuesFromDefaultClientConfig` from `client` package. (It was introduced in `v0.50.6` as a quick fix).
* (grpcserver) [#20945](https://github.com/cosmos/cosmos-sdk/pull/20945) Adds error handling for out-of-gas panics in grpc query handlers.

### Bug Fixes

Expand Down
13 changes: 13 additions & 0 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/status"

errorsmod "cosmossdk.io/errors"
storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -67,6 +68,18 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {

app.logger.Debug("gRPC query received of type: " + fmt.Sprintf("%#v", req))

// Catch an OutOfGasPanic caused in the query handlers
defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
err = errorsmod.Wrapf(sdkerrors.ErrOutOfGas, "Query gas limit exceeded: %v, out of gas in location: %v", sdkCtx.GasMeter().Limit(), rType.Descriptor)
default:
panic(r)
}
}
}()

return handler(grpcCtx, req)
}

Expand Down
9 changes: 9 additions & 0 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"

gogoproto "github.com/cosmos/gogoproto/proto"
"golang.org/x/exp/slices"

runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2"
Expand Down Expand Up @@ -44,6 +45,10 @@ type App[T transaction.Tx] struct {
interfaceRegistrar registry.InterfaceRegistrar
amino legacy.Amino
moduleManager *MM[T]

// GRPCQueryDecoders maps gRPC method name to a function that decodes the request
// bytes into a gogoproto.Message, which then can be passed to appmanager.
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
}

// Logger returns the app logger.
Expand Down Expand Up @@ -109,3 +114,7 @@ func (a *App[T]) ExecuteGenesisTx(_ []byte) error {
func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
return a.AppManager
}

func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) {
return a.GRPCQueryDecoders
}
43 changes: 33 additions & 10 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"sort"

gogoproto "github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -555,17 +556,28 @@ func (m *MM[T]) assertNoForgottenModules(

func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error {
c := &configurator{
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
}
return s.RegisterServices(c)

err := s.RegisterServices(c)
if err != nil {
return fmt.Errorf("unable to register services: %w", err)
}
app.GRPCQueryDecoders = c.grpcQueryDecoders
return nil
}

var _ grpc.ServiceRegistrar = (*configurator)(nil)

type configurator struct {
// grpcQueryDecoders is required because module expose queries through gRPC
// this provides a way to route to modules using gRPC.
grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error)

stfQueryRouter *stf.MsgRouterBuilder
stfMsgRouter *stf.MsgRouterBuilder
registry *protoregistry.Files
Expand Down Expand Up @@ -596,17 +608,28 @@ func (c *configurator) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
// TODO(tip): what if a query is not deterministic?
err := registerMethod(c.stfQueryRouter, sd, md, ss)
requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss)
if err != nil {
return fmt.Errorf("unable to register query handler %s: %w", md.MethodName, err)
}

// register gRPC query method.
typ := gogoproto.MessageType(requestFullName)
if typ == nil {
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
}
decoderFunc := func(bytes []byte) (gogoproto.Message, error) {
msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message)
return msg, gogoproto.Unmarshal(bytes, msg)
}
c.grpcQueryDecoders[md.MethodName] = decoderFunc
}
return nil
}

func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
err := registerMethod(c.stfMsgRouter, sd, md, ss)
_, err := registerMethod(c.stfMsgRouter, sd, md, ss)
if err != nil {
return fmt.Errorf("unable to register msg handler %s: %w", md.MethodName, err)
}
Expand All @@ -633,13 +656,13 @@ func registerMethod(
sd *grpc.ServiceDesc,
md grpc.MethodDesc,
ss interface{},
) error {
) (string, error) {
requestName, err := requestFullNameFromMethodDesc(sd, md)
if err != nil {
return err
return "", err
}

return stfRouter.RegisterHandler(string(requestName), func(
return string(requestName), stfRouter.RegisterHandler(string(requestName), func(
ctx context.Context,
msg appmodulev2.Message,
) (resp appmodulev2.Message, err error) {
Expand Down
16 changes: 8 additions & 8 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"cosmossdk.io/server/v2/api/grpc/gogoreflection"
)

type GRPCServer[AppT serverv2.AppI[T], T transaction.Tx] struct {
type GRPCServer[T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption
Expand All @@ -23,15 +23,15 @@ type GRPCServer[AppT serverv2.AppI[T], T transaction.Tx] struct {
}

// New creates a new grpc server.
func New[AppT serverv2.AppI[T], T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[AppT, T] {
return &GRPCServer[AppT, T]{
func New[T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[T] {
return &GRPCServer[T]{
cfgOptions: cfgOptions,
}
}

// Init returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server.
func (s *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error {
func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error {
cfg := s.Config().(*Config)
if v != nil {
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
Expand All @@ -57,11 +57,11 @@ func (s *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger)
return nil
}

func (s *GRPCServer[AppT, T]) Name() string {
func (s *GRPCServer[T]) Name() string {
return "grpc"
}

func (s *GRPCServer[AppT, T]) Config() any {
func (s *GRPCServer[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
cfg := DefaultConfig()
// overwrite the default config with the provided options
Expand All @@ -75,7 +75,7 @@ func (s *GRPCServer[AppT, T]) Config() any {
return s.config
}

func (s *GRPCServer[AppT, T]) Start(ctx context.Context) error {
func (s *GRPCServer[T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
}
Expand All @@ -102,7 +102,7 @@ func (s *GRPCServer[AppT, T]) Start(ctx context.Context) error {
return err
}

func (s *GRPCServer[AppT, T]) Stop(ctx context.Context) error {
func (s *GRPCServer[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}
Expand Down
22 changes: 10 additions & 12 deletions server/v2/api/grpcgateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ import (
serverv2 "cosmossdk.io/server/v2"
)

var _ serverv2.ServerComponent[
serverv2.AppI[transaction.Tx], transaction.Tx,
] = (*GRPCGatewayServer[serverv2.AppI[transaction.Tx], transaction.Tx])(nil)
var _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil)

const (
// GRPCBlockHeightHeader is the gRPC header for block height.
GRPCBlockHeightHeader = "x-cosmos-block-height"
)

type GRPCGatewayServer[AppT serverv2.AppI[T], T transaction.Tx] struct {
type GRPCGatewayServer[T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption
Expand All @@ -37,7 +35,7 @@ type GRPCGatewayServer[AppT serverv2.AppI[T], T transaction.Tx] struct {
}

// New creates a new gRPC-gateway server.
func New[AppT serverv2.AppI[T], T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[AppT, T] {
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[T] {
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
marshalerOption := &gateway.JSONPb{
Expand All @@ -47,7 +45,7 @@ func New[AppT serverv2.AppI[T], T transaction.Tx](grpcSrv *grpc.Server, ir jsonp
AnyResolver: ir,
}

return &GRPCGatewayServer[AppT, T]{
return &GRPCGatewayServer[T]{
GRPCSrv: grpcSrv,
GRPCGatewayRouter: runtime.NewServeMux(
// Custom marshaler option is required for gogo proto
Expand All @@ -65,11 +63,11 @@ func New[AppT serverv2.AppI[T], T transaction.Tx](grpcSrv *grpc.Server, ir jsonp
}
}

func (g *GRPCGatewayServer[AppT, T]) Name() string {
func (g *GRPCGatewayServer[T]) Name() string {
return "grpc-gateway"
}

func (s *GRPCGatewayServer[AppT, T]) Config() any {
func (s *GRPCGatewayServer[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
cfg := DefaultConfig()
// overwrite the default config with the provided options
Expand All @@ -83,7 +81,7 @@ func (s *GRPCGatewayServer[AppT, T]) Config() any {
return s.config
}

func (s *GRPCGatewayServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error {
func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], v *viper.Viper, logger log.Logger) error {
cfg := s.Config().(*Config)
if v != nil {
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
Expand All @@ -100,7 +98,7 @@ func (s *GRPCGatewayServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.
return nil
}

func (s *GRPCGatewayServer[AppT, T]) Start(ctx context.Context) error {
func (s *GRPCGatewayServer[T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
}
Expand All @@ -110,7 +108,7 @@ func (s *GRPCGatewayServer[AppT, T]) Start(ctx context.Context) error {
return nil
}

func (s *GRPCGatewayServer[AppT, T]) Stop(ctx context.Context) error {
func (s *GRPCGatewayServer[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}
Expand All @@ -119,7 +117,7 @@ func (s *GRPCGatewayServer[AppT, T]) Stop(ctx context.Context) error {
}

// Register implements registers a grpc-gateway server
func (s *GRPCGatewayServer[AppT, T]) Register(r mux.Router) error {
func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
// configure grpc-gatway server
r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Fall back to grpc gateway server.
Expand Down
52 changes: 31 additions & 21 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"

coreappmgr "cosmossdk.io/core/app"
"cosmossdk.io/core/comet"
Expand All @@ -31,6 +32,9 @@ import (
var _ abci.Application = (*Consensus[transaction.Tx])(nil)

type Consensus[T transaction.Tx] struct {
// legacy support for gRPC
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)

app *appmanager.AppManager[T]
cfg Config
store types.Store
Expand All @@ -56,18 +60,28 @@ type Consensus[T transaction.Tx] struct {
func NewConsensus[T transaction.Tx](
app *appmanager.AppManager[T],
mp mempool.Mempool[T],
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error),
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
logger log.Logger,
) *Consensus[T] {
return &Consensus[T]{
mempool: mp,
store: store,
app: app,
cfg: cfg,
txCodec: txCodec,
logger: logger,
grpcQueryDecoders: grpcQueryDecoders,
app: app,
cfg: cfg,
store: store,
logger: logger,
txCodec: txCodec,
streaming: streaming.Manager{},
snapshotManager: nil,
mempool: mp,
lastCommittedHeight: atomic.Int64{},
prepareProposalHandler: nil,
processProposalHandler: nil,
verifyVoteExt: nil,
extendVote: nil,
chainID: "",
}
}

Expand Down Expand Up @@ -150,18 +164,16 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc

// Query implements types.Application.
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (*abciproto.QueryResponse, error) {
// follow the query path from here
decodedMsg, err := c.txCodec.Decode(req.Data)
protoMsg, ok := any(decodedMsg).(transaction.Msg)
if !ok {
return nil, fmt.Errorf("decoded type T %T must implement core/transaction.Msg", decodedMsg)
}

// if no error is returned then we can handle the query with the appmanager
// otherwise it is a KV store query
if err == nil {
res, err := c.app.Query(ctx, uint64(req.Height), protoMsg)
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
// check if it's a gRPC method
grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path]
if isGRPC {
protoRequest, err := grpcQueryDecoder(req.Data)
if err != nil {
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)

if err != nil {
resp := queryResult(err)
resp.Height = req.Height
Expand All @@ -179,8 +191,6 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.Trace), nil
}

var resp *abciproto.QueryResponse

switch path[0] {
case cmtservice.QueryPathApp:
resp, err = c.handlerQueryApp(ctx, path, req)
Expand Down Expand Up @@ -391,7 +401,7 @@ func (c *Consensus[T]) FinalizeBlock(
// ProposerAddress: req.ProposerAddress,
// LastCommit: req.DecidedLastCommit,
// },
//}
// }
//
// ctx = context.WithValue(ctx, corecontext.CometInfoKey, &comet.Info{
// Evidence: sdktypes.ToSDKEvidence(req.Misbehavior),
Expand Down
Loading

0 comments on commit 3eaf060

Please sign in to comment.